home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import dbus
- import dbus.service as dbus
- import gobject
- import logging
- import os
- import os.path as os
- import sys
- from XKit import xutils, xorgparser
- import time
- import shutil
- import subprocess
-
- try:
- from dbus.lowlevel import HANDLER_RESULT_NOT_YET_HANDLED
- except ImportError:
- from _dbus_bindings import HANDLER_RESULT_NOT_YET_HANDLED
-
-
- class AccessDeniedException(dbus.DBusException):
- '''This exception is raised when some operation is not permitted.'''
- _dbus_error_name = 'com.ubuntu.screenresolution.Mechanism.AccessDeniedException'
-
-
- class UnsupportedException(dbus.DBusException):
- '''This exception is raised when some operation is not supported.'''
- _dbus_error_name = 'com.ubuntu.screenresolution.Mechanism.UnsupportedException'
-
-
- class UsageError(dbus.DBusException):
- '''This exception is raised when some operation was not used properly.'''
- _dbus_error_name = 'com.ubuntu.screenresolution.Mechanism.UsageError'
-
-
- class PolicyKitService(dbus.service.Object):
- '''A D-BUS service that uses PolicyKit for authorization.'''
-
- def _check_permission(self, sender, conn, action, reconnect = True):
- '''
- Verifies if the specified action is permitted, and raises
- an AccessDeniedException if not.
- '''
- if sender is None and conn is None:
- return None
- dbus_info = dbus.Interface(conn.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus/Bus', False), 'org.freedesktop.DBus')
- pid = dbus_info.GetConnectionUnixProcessID(sender)
- polkit = dbus.Interface(get_service_bus().get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', False), 'org.freedesktop.PolicyKit1.Authority')
-
- try:
- (is_auth, _, details) = polkit.CheckAuthorization(('unix-process', {
- 'pid': dbus.UInt32(pid, variant_level = 1) }), action, {
- '': '' }, dbus.UInt32(1), '', timeout = 600)
- except dbus.DBusException:
- conn is None
- e = conn is None
- if reconnect and e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown':
- return self._check_polkit_privilege(sender, conn, action, reconnect = False)
- raise
- except:
- e._dbus_error_name == 'org.freedesktop.DBus.Error.ServiceUnknown'
-
- if not is_auth:
- logging.debug('_check_permission: sender %s on connection %s pid %i is not authorized for %s: %s' % (sender, conn, pid, action, str(details)))
- raise AccessDeniedException(action)
- is_auth
-
-
-
- class BackendService(PolicyKitService):
- '''A D-Bus service that PolicyKit controls access to.'''
- INTERFACE_NAME = 'com.ubuntu.ScreenResolution.Mechanism'
- SERVICE_NAME = 'com.ubuntu.ScreenResolution.Mechanism'
- IDLE_TIMEOUT = 30
-
- def __init__(self, connection = None, path = '/'):
- if connection is None:
- connection = get_service_bus()
-
- super(BackendService, self).__init__(connection, path)
- self._BackendService__name = dbus.service.BusName(self.SERVICE_NAME, connection)
- self._BackendService__loop = gobject.MainLoop()
- self._BackendService__timeout = 0
- connection.add_message_filter(self._BackendService__message_filter)
-
-
- def __message_filter(self, connection, message):
- '''
- D-BUS message filter that keeps the service alive,
- as long as it receives message.
- '''
- if self._BackendService__timeout:
- self._BackendService__start_idle_timeout()
-
- return HANDLER_RESULT_NOT_YET_HANDLED
-
-
- def __start_idle_timeout(self):
- '''Restarts the timeout for terminating the service when idle.'''
- if self._BackendService__timeout:
- gobject.source_remove(self._BackendService__timeout)
-
- self._BackendService__timeout = gobject.timeout_add(self.IDLE_TIMEOUT * 1000, self._BackendService__timeout_cb)
-
-
- def __timeout_cb(self):
- '''Timeout callback that terminates the service when idle.'''
- if self.connection.list_exported_child_objects('/'):
- return True
- print 'Terminating %s due to inactivity.' % self.SERVICE_NAME
- self._BackendService__loop.quit()
- return False
-
-
- def run(self):
- '''Creates a GLib main loop for keeping the service alive.'''
- print 'Running %s.' % self.SERVICE_NAME
- print 'Terminating it after %d seconds of inactivity.' % self.IDLE_TIMEOUT
- self._BackendService__start_idle_timeout()
- self._BackendService__loop.run()
-
-
- def setVirtual(self, virtres, sender = None, conn = None):
- '''
- Replace the first line of this example with a source and a destination file
- '''
- self._check_permission(sender, conn, 'com.ubuntu.screenresolution.mechanism.configure')
- virtual = ' '.join(virtres)
- source = '/etc/X11/xorg.conf'
- destination = '/etc/X11/xorg.conf'
- if os.path.exists(source):
- backup = source + '.' + time.strftime('%Y%m%d%H%M%S')
- shutil.copyfile(source, backup)
-
-
- try:
- a = xutils.XUtils(source)
- except (IOError, xorgparser.ParseException):
- a = xutils.XUtils()
-
- empty = True
- for section in a.globaldict:
- if len(a.globaldict[section]) > 0:
- empty = False
- break
- continue
-
- if empty:
- a.makeSection('Device', 'Configured Video Device')
- a.makeSection('Screen', identifier = 'Configured Screen Device')
- a.addReference('Screen', 'Device', 'Configured Video Device', position = 0)
- a.makeSubSection('Screen', 'Display', position = 0)
- a.addSubOption('Screen', 'Display', 'Virtual', value = virtual, position = 0)
- else:
- devicelen = len(a.globaldict['Device'])
- screenlen = len(a.globaldict['Screen'])
- if screenlen == 0:
- screen = a.makeSection('Screen', identifier = 'Configured Screen Device')
- if devicelen == 0:
- device = a.makeSection('Device', 'Configured Video Device')
- else:
- device = 0
- a.addReference('Screen', 'Device', 'Configured Video Device', position = device)
- a.makeSubSection('Screen', 'Display', position = 0)
- a.addSubOption('Screen', 'Display', 'Virtual', value = virtual, position = 0)
- else:
- for screen in a.globaldict['Screen']:
- a.makeSubSection('Screen', 'Display', position = screen)
- a.addSubOption('Screen', 'Display', 'Virtual', value = virtual, position = screen)
-
- a.writeFile(destination)
- return True
-
- setVirtual = dbus.service.method(dbus_interface = INTERFACE_NAME, in_signature = 'as', out_signature = 'b', sender_keyword = 'sender', connection_keyword = 'conn')(setVirtual)
-
- def setDontZap(self, enable, sender = None, conn = None):
- '''Try to set the DontZap option in the xorg.conf
- and return the exit code of dontzap'''
- self._check_permission(sender, conn, 'com.ubuntu.screenresolution.mechanism.dontzap')
- dontzap_file = '/usr/bin/dontzap'
- if not os.path.isfile(dontzap_file):
- logging.error('/usr/bin/dontzap does not exist')
- return 1
- if enable in ('--enable', '--disable'):
- logging.debug('calling dontzap with %s' % enable)
- retcode = subprocess.call([
- dontzap_file,
- enable])
- return retcode
- logging.error('called with wrong arguments = %s' % enable)
- return 1
-
- setDontZap = dbus.service.method(dbus_interface = INTERFACE_NAME, in_signature = 's', out_signature = 'i', sender_keyword = 'sender', connection_keyword = 'conn')(setDontZap)
-
-
- def get_service_bus():
- '''Retrieves a reference to the D-BUS system bus.'''
- return dbus.SystemBus()
-
-
- def get_service(bus = None):
- '''Retrieves a reference to the D-BUS driven configuration service.'''
- if not bus:
- bus = get_service_bus()
-
- service = bus.get_object(BackendService.SERVICE_NAME, '/')
- service = dbus.Interface(service, BackendService.INTERFACE_NAME)
- return service
-
- if __name__ == '__main__':
- import sys
- from sys import argv
- from dbus.mainloop.glib import DBusGMainLoop
- DBusGMainLoop(set_as_default = True)
- if '--debug' in argv or '-d' in argv:
- logging.basicConfig(stream = sys.stderr)
- logging.getLogger().setLevel(logging.NOTSET)
-
- BackendService().run()
-
-